/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.iceberg;

import java.util.List;
import java.util.Set;
import org.apache.iceberg.events.IncrementalScanEvent;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.SnapshotUtil;

class IncrementalDataTableScan extends DataTableScan {
  IncrementalDataTableScan(Table table, Schema schema, TableScanContext context) {
    super(table, schema, context.useSnapshotId(null));
    validateSnapshotIds(table, context.fromSnapshotId(), context.toSnapshotId());
  }

  @Override
  public TableScan asOfTime(long timestampMillis) {
    throw new UnsupportedOperationException(
        String.format(
            "Cannot scan table as of time %s: configured for incremental data in snapshots (%s, %s]",
            timestampMillis, context().fromSnapshotId(), context().toSnapshotId()));
  }

  @Override
  public TableScan useRef(String ref) {
    throw new UnsupportedOperationException(
        String.format(
            "Cannot scan table using ref %s: configured for incremental data in snapshots (%s, %s]",
            ref, context().fromSnapshotId(), context().toSnapshotId()));
  }

  @Override
  public TableScan useSnapshot(long scanSnapshotId) {
    throw new UnsupportedOperationException(
        String.format(
            "Cannot scan table using scan snapshot id %s: configured for incremental data in snapshots (%s, %s]",
            scanSnapshotId, context().fromSnapshotId(), context().toSnapshotId()));
  }

  @Override
  public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) {
    validateSnapshotIdsRefinement(fromSnapshotId, toSnapshotId);
    return new IncrementalDataTableScan(
        table(),
        schema(),
        context().fromSnapshotIdExclusive(fromSnapshotId).toSnapshotId(toSnapshotId));
  }

  @Override
  public TableScan appendsAfter(long newFromSnapshotId) {
    final Snapshot currentSnapshot = table().currentSnapshot();
    Preconditions.checkState(
        currentSnapshot != null,
        "Cannot scan appends after %s, there is no current snapshot",
        newFromSnapshotId);
    return appendsBetween(newFromSnapshotId, currentSnapshot.snapshotId());
  }

  @Override
  public CloseableIterable<FileScanTask> planFiles() {
    Long fromSnapshotId = context().fromSnapshotId();
    Long toSnapshotId = context().toSnapshotId();

    List<Snapshot> snapshots = snapshotsWithin(table(), fromSnapshotId, toSnapshotId);
    Set<Long> snapshotIds = Sets.newHashSet(Iterables.transform(snapshots, Snapshot::snapshotId));
    Set<ManifestFile> manifests =
        FluentIterable.from(snapshots)
            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
            .filter(manifestFile -> snapshotIds.contains(manifestFile.snapshotId()))
            .toSet();

    ManifestGroup manifestGroup =
        new ManifestGroup(table().io(), manifests)
            .caseSensitive(isCaseSensitive())
            .select(scanColumns())
            .filterData(filter())
            .filterManifestEntries(
                manifestEntry ->
                    snapshotIds.contains(manifestEntry.snapshotId())
                        && manifestEntry.status() == ManifestEntry.Status.ADDED)
            .specsById(table().specs())
            .ignoreDeleted()
            .columnsToKeepStats(columnsToKeepStats());

    if (shouldIgnoreResiduals()) {
      manifestGroup = manifestGroup.ignoreResiduals();
    }

    Listeners.notifyAll(
        new IncrementalScanEvent(
            table().name(), fromSnapshotId, toSnapshotId, filter(), schema(), false));

    if (manifests.size() > 1 && shouldPlanWithExecutor()) {
      manifestGroup = manifestGroup.planWith(planExecutor());
    }

    return manifestGroup.planFiles();
  }

  @Override
  @SuppressWarnings("checkstyle:HiddenField")
  protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext context) {
    return new IncrementalDataTableScan(table, schema, context);
  }

  private static List<Snapshot> snapshotsWithin(
      Table table, long fromSnapshotId, long toSnapshotId) {
    List<Snapshot> snapshots = Lists.newArrayList();
    for (Snapshot snapshot :
        SnapshotUtil.ancestorsBetween(toSnapshotId, fromSnapshotId, table::snapshot)) {
      // for now, incremental scan supports only appends
      if (snapshot.operation().equals(DataOperations.APPEND)) {
        snapshots.add(snapshot);
      } else if (snapshot.operation().equals(DataOperations.OVERWRITE)) {
        throw new UnsupportedOperationException(
            String.format(
                "Found %s operation, cannot support incremental data in snapshots (%s, %s]",
                DataOperations.OVERWRITE, fromSnapshotId, toSnapshotId));
      }
    }
    return snapshots;
  }

  private void validateSnapshotIdsRefinement(long newFromSnapshotId, long newToSnapshotId) {
    Set<Long> snapshotIdsRange =
        Sets.newHashSet(
            SnapshotUtil.ancestorIdsBetween(
                context().toSnapshotId(), context().fromSnapshotId(), table()::snapshot));
    // since snapshotIdsBetween return ids in range (fromSnapshotId, toSnapshotId]
    snapshotIdsRange.add(context().fromSnapshotId());
    Preconditions.checkArgument(
        snapshotIdsRange.contains(newFromSnapshotId),
        "from snapshot id %s not in existing snapshot ids range (%s, %s]",
        newFromSnapshotId,
        context().fromSnapshotId(),
        newToSnapshotId);
    Preconditions.checkArgument(
        snapshotIdsRange.contains(newToSnapshotId),
        "to snapshot id %s not in existing snapshot ids range (%s, %s]",
        newToSnapshotId,
        context().fromSnapshotId(),
        context().toSnapshotId());
  }

  private static void validateSnapshotIds(Table table, long fromSnapshotId, long toSnapshotId) {
    Preconditions.checkArgument(
        fromSnapshotId != toSnapshotId, "from and to snapshot ids cannot be the same");
    Preconditions.checkArgument(
        table.snapshot(fromSnapshotId) != null, "from snapshot %s does not exist", fromSnapshotId);
    Preconditions.checkArgument(
        table.snapshot(toSnapshotId) != null, "to snapshot %s does not exist", toSnapshotId);
    Preconditions.checkArgument(
        SnapshotUtil.isAncestorOf(table, toSnapshotId, fromSnapshotId),
        "from snapshot %s is not an ancestor of to snapshot  %s",
        fromSnapshotId,
        toSnapshotId);
  }
}
